{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "ETL Pipelines with Prefect\n",
    "==========================\n",
    "\n",
    "[Prefect](https://github.com/PrefectHQ/prefect) is a platform for automating data workflows.  Data engineers and data scientists can build, test and deploy production pipelines without worrying about all of the [\"negative engineering\" aspects](https://medium.com/the-prefect-blog/positive-and-negative-data-engineering-a02cb497583d) of production.  For example, Prefect makes it easy to deploy a workflow that runs on a complicated schedule, requires task retries in the event of failures, and sends notifications when certain tasks are complete.  Prefect was built on top of Dask, and [relies on Dask](http://stories.dask.org/en/latest/prefect-workflows.html#how-dask-helps) to schedule and manage the execution of a Prefect workflow in a distributed environment.\n",
    "\n",
    "This example demonstrates running a Prefect ETL Flow on Dask which ultimately creates a GIF.  While this is a somewhat unconventional use case of Prefect, we're no strangers to [unconventional use cases](https://medium.com/the-prefect-blog/prefect-runs-on-prefect-3e6df553c3a4).\n",
    "\n",
    "In the world of workflow engines, Prefect supports many unique features; in this particular example we will see:\n",
    "\n",
    "- parametrization of workflows\n",
    "- dynamic runtime \"mapping\" of workflow tasks\n",
    "- customizable execution logic\n",
    "\n",
    "You wouldn't get this from any other engine.\n",
    "\n",
    "**Contents**\n",
    "\n",
    "0. [Description of goal](#goal)\n",
    "1. [Setting up our environment](#environment)\n",
    "2. Building our Flow\n",
    "    1. [Extract](#extract)\n",
    "    1. [Transform](#transform)\n",
    "    1. [Load](#load)\n",
    "    1. [Putting the pieces together](#build)\n",
    "3. [Running our Flow on Dask](#dask)\n",
    "4. [Watching our GIF](#play)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<a id=\"goal\"></a>\n",
    "\n",
    "Goal\n",
    "----\n",
    "\n",
    "To demonstrate how Prefect and Dask work together, we are going to build and execute a standard \"Extract / Transform / Load\" (ETL) workflow for processing some basic image data.  Most ETL workflows involve a scheduled migration of data from one database to another.  In our case, we will be moving data from a file located at a known URL to our local hard disk, converting the individual file into a series of frames, and compiling those frames into a GIF.  The URL references a file containing raw bytes such as:\n",
    "\n",
    "```python\n",
    "b\"\"\"aÙ\u0001w\u0001˜\u0000\u0000≠•∆≠≠ﬁ#!\b\u0015\u0016\u0003≠≠÷≠•Ω≠úΩ••µú•µîúµ•úΩ••Ω3&\u0015µ•Ω\u0018!\u000b",
    "µ≠∆≠•¥4(%µú∑≠≠Œ≠î≠≠≠∆≠îµúî≠úîµE5.≠ú≠≠•Œµµﬁ••∆•≠ŒµµŒúúΩ62&)1&623µ•∆Ωµ÷úî•ßjxΩΩÁú•Ωµ≠Œ••≠ú•≠Ω≠∆≠µÁâUV≠µ‹ΩµŒîî•NC5µ≠Ÿôãô•î•µ•µîú≠#\u0017\bVHCuhl≠≠ΩôchâRIoc]™≠Á≠î•™ú»öis•ú•f7,íYfL9?îî≠≠•÷∑ò™gWVxGEΩ≠–))1qB5µ≠Ω81\u0018R,\u0015´tÜñWV\u001e",
    "!\u001e",
    "HCDBB5;5?\n",
    "\"\"\"\n",
    "```\n",
    "\n",
    "The steps of our workflow will be as follows:\n",
    "- Extract: pull the data file from a URL (speicified by a `Parameter`) to disk\n",
    "- Transform: split the file into multiple files, each corresponding to a single frame\n",
    "- Load: Store each frame individually, and compile the frames together into a GIF\n",
    "\n",
    "Once we have built our Flow, we can execute it with different values for the `Parameter` or even run it on a nightly schedule.\n",
    "\n",
    "**NOTE:** If we planned on executing this Flow in a truly distributed environment, writing the images to the local filesystem would _not_ be appropriate.  We would instead use an external datastore such as Google Cloud Storage, or a proper database."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<a id=\"environment\"></a>\n",
    "\n",
    "Setting up our environment\n",
    "--------------------------\n",
    "\n",
    "Before proceeding, we need to install both the [`prefect`](https://pypi.org/project/prefect/) and [`imageio`](https://pypi.org/project/imageio/) packages."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install imageio prefect[viz]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<a id=\"extract\"></a>\n",
    "\n",
    "### Extract\n",
    "\n",
    "First, we will define our tasks for _extracting_ the image data file from a given URL and saving it to a given file location.  To do so, we will utilize two methods for creating Prefect Tasks:\n",
    "- the `task` decorator for converting any Python function into a task\n",
    "- a pre-written, configurable Task from the [Prefect \"Task Library\"](https://docs.prefect.io/guide/task_library/) which helps us abstract some standard boilerplate\n",
    "\n",
    "Additionally, we will utilize the following Prefect concepts:\n",
    "- a [Prefect signal](https://docs.prefect.io/guide/core_concepts/execution.html#state-signals) for marking this task and its downstream depedencies as successfully \"Skipped\" if the file is already present in our local filesystem\n",
    "- retry semantics: if, for whatever reason, our `curl` command fails to connect, we want it to retry up to 2 times with a 10 second delay.  This way, if we run this workflow on a schedule we won't need to concern ourselves with temporary intermittent connection issues.\n",
    "\n",
    "Right now we are simply defining our individual tasks - we won't actually set up our dependency structure until we create the full Flow."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import datetime\n",
    "import os\n",
    "\n",
    "import prefect\n",
    "from prefect import task\n",
    "from prefect.engine.signals import SKIP\n",
    "from prefect.tasks.shell import ShellTask\n",
    "\n",
    "\n",
    "@task\n",
    "def curl_cmd(url: str, fname: str) -> str:\n",
    "    \"\"\"\n",
    "    The curl command we wish to execute.\n",
    "    \"\"\"\n",
    "    if os.path.exists(fname):\n",
    "        raise SKIP(\"Image data file already exists.\")\n",
    "    return \"curl -fL -o {fname} {url}\".format(fname=fname, url=url)\n",
    "\n",
    "\n",
    "# ShellTask is a task from the Task library which will execute a given command in a subprocess\n",
    "# and fail if the command returns a non-zero exit code\n",
    "\n",
    "download = ShellTask(name=\"curl_task\", max_retries=2, retry_delay=datetime.timedelta(seconds=10))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<a id=\"transform\"></a>\n",
    "\n",
    "### Transform\n",
    "\n",
    "Next up, we need to define our task which loads the image data file and splits it into multiple frames.  In this case, each frame is delimited by 4 newlines.  Note that, in the event the previous two tasks are \"Skipped\", the default behavior in Prefect is to skip downstream dependencies as well.  However, as with most things in Prefect, this behavior is customizable.  In this case, we want this task to run regardless of whether the upstreams skipped or not, so we set the `skip_on_upstream_skip` flag to `False`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@task(skip_on_upstream_skip=False)\n",
    "def load_and_split(fname: str) -> list:\n",
    "    \"\"\"\n",
    "    Loads image data file at `fname` and splits it into\n",
    "    multiple frames.  Returns a list of bytes, one element\n",
    "    for each frame.\n",
    "    \"\"\"\n",
    "    with open(fname, \"rb\") as f:\n",
    "        images = f.read()\n",
    "        \n",
    "    return [img for img in images.split(b\"\\n\" * 4) if img]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<a id=\"load\"></a>\n",
    "\n",
    "### Load\n",
    "\n",
    "Finally, we want to write our frames to disk as well as combine the frames into a single GIF.  In order to achieve this goal, we are going to utilize [Prefect's task \"mapping\" feature](https://docs.prefect.io/guide/core_concepts/mapping.html) which conveniently spawns new tasks in response to upstream outputs.  In this case, we will write a single task for writing an image to disk, and \"map\" this task over all the image frames returned by `load_and_split` above!  To infer which frame we are on, we look in [`prefect.context`](https://docs.prefect.io/guide/core_concepts/execution.html#context).\n",
    "\n",
    "Additionally, we can \"reduce\" over a mapped task - in this case, we will take the collection of mapped tasks and pass them into our `combine_to_gif` task for creating and saving our GIF."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@task\n",
    "def write_to_disk(image: bytes) -> bytes:\n",
    "    \"\"\"\n",
    "    Given a single image represented as bytes, writes the image\n",
    "    to the present working directory with a filename determined\n",
    "    by `map_index`.  Returns the image bytes.\n",
    "    \"\"\"\n",
    "    frame_no = prefect.context.get(\"map_index\")\n",
    "    with open(\"frame_{0:0=2d}.gif\".format(frame_no), \"wb\") as f:\n",
    "        f.write(image)\n",
    "    return image"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import imageio\n",
    "from io import BytesIO\n",
    "\n",
    "\n",
    "@task\n",
    "def combine_to_gif(image_bytes: list) -> None:\n",
    "    \"\"\"\n",
    "    Given a list of ordered images represented as bytes,\n",
    "    combines them into a single GIF stored in the present working directory.\n",
    "    \"\"\"\n",
    "    images = [imageio.imread(BytesIO(image)) for image in image_bytes]\n",
    "    imageio.mimsave('./clip.gif', images)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<a id=\"build\"></a>\n",
    "\n",
    "Build the Flow\n",
    "---------------\n",
    "\n",
    "Finally, we need to put our tasks together into a Prefect \"Flow\".  Similar to Dask's `delayed` interface, all computation is deferred and no Task code will be executed in this step.  Because Prefect maintains a stricter contract between tasks and additionally needs the ability to run in non-Dask execution environments, the mechanism for deferring execution is independent of Dask.\n",
    "\n",
    "In addition to the tasks we have already defined, we introduce two \"Parameters\" for specifying the URL and local file location of our data.  At runtime, we can optionally override these tasks to return different values."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from prefect import Parameter, Flow\n",
    "\n",
    "\n",
    "DATA_URL = Parameter(\"DATA_URL\", \n",
    "                     default=\"https://github.com/cicdw/image-data/blob/master/all-images.img?raw=true\")\n",
    "\n",
    "DATA_FILE = Parameter(\"DATA_FILE\", default=\"image-data.img\")\n",
    "\n",
    "\n",
    "with Flow(\"Image ETL\") as flow:\n",
    "    \n",
    "    # Extract\n",
    "    command = curl_cmd(DATA_URL, DATA_FILE)\n",
    "    curl = download(command=command)\n",
    "    \n",
    "    # Transform\n",
    "    # we use the `upstream_tasks` keyword to specify non-data dependencies\n",
    "    images = load_and_split(fname=DATA_FILE, upstream_tasks=[curl])\n",
    "    \n",
    "    # Load  \n",
    "    frames = write_to_disk.map(images)\n",
    "    result = combine_to_gif(frames)\n",
    "    \n",
    "\n",
    "flow.visualize()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<a id=\"dask\"></a>\n",
    "\n",
    "### Running the Flow on Dask\n",
    "\n",
    "Now we have built our Flow, independently of Dask.  We could execute this Flow sequentially, Task after Task, but there is inherent parallelism in our mapping of the images to files that we want to exploit.  Luckily, Dask makes this easy to achieve.\n",
    "\n",
    "First, we will start a local Dask cluster.  Then, we will run our Flow against Prefect's `DaskExecutor`, which will submit each Task to our Dask cluster and use Dask's distributed scheduler for determining when and where each Task should run.  Essentially, we built a Directed Acylic Graph (DAG) and are simply \"submitting\" that DAG to Dask for handling its execution in a distributed way."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "# start our Dask cluster\n",
    "from dask.distributed import Client\n",
    "\n",
    "\n",
    "client = Client(n_workers=4, threads_per_worker=1)\n",
    "\n",
    "# point Prefect's DaskExecutor to our Dask cluster\n",
    "\n",
    "from prefect.engine.executors import DaskExecutor\n",
    "\n",
    "executor = DaskExecutor(address=client.scheduler.address)\n",
    "flow.run(executor=executor)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next Steps\n",
    "----------\n",
    "\n",
    "Now that we've built our workflow, what next?  The interested reader should try to:\n",
    "\n",
    "- run the Flow again to see how the `SKIP` signal behaves\n",
    "- use different parameters for both the URL and the file location (Parameter values can be overriden by simply passing their names as keyword arguments to `flow.run()`)\n",
    "- introduce a new Parameter for the filename of the final GIF\n",
    "- use Prefect's [scheduler interface](https://docs.prefect.io/guide/core_concepts/schedules.html) to run our workflow on a schedule"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<a id=\"play\"></a>\n",
    "\n",
    "Play\n",
    "----\n",
    "\n",
    "Finally, let's watch our creation!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.display import HTML\n",
    "\n",
    "HTML('<img src=\"./clip.gif\" alt=\"Rick Daskley\">')"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
